import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;


public class ScalarFunctions
{


    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


       DataStream<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));


        tEnv.createTemporaryView("MyTable",ds1);
//        建立視圖的目的是爲了方便使用SQL


// call function "inline" without registration in Table API
        Table result1=tEnv.from("MyTable").select(call(HashFunction.class, $("product")));
        tEnv.toAppendStream(result1, Row.class).print();
// register function
        tEnv.createTemporarySystemFunction("HashFunction", HashFunction.class);

// call registered function in Table API
        Table result2=tEnv.from("MyTable").select(call("HashFunction", $("product")));
        tEnv.toAppendStream(result2, Row.class).print();

// call registered function in SQL
        Table result3=tEnv.sqlQuery("SELECT HashFunction(product) FROM MyTable");
        tEnv.toAppendStream(result3, Row.class).print();

        env.execute();
    }

}
